/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.extensions.sql.example;



import com.bff.gaia.unified.sdk.Pipeline;

import com.bff.gaia.unified.sdk.extensions.sql.SqlTransform;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.sdk.options.PipelineOptionsFactory;

import com.bff.gaia.unified.sdk.schemas.Schema;

import com.bff.gaia.unified.sdk.transforms.Create;

import com.bff.gaia.unified.sdk.transforms.MapElements;

import com.bff.gaia.unified.sdk.transforms.SerializableFunctions;

import com.bff.gaia.unified.sdk.transforms.SimpleFunction;

import com.bff.gaia.unified.sdk.values.*;

import com.bff.gaia.unified.sdk.values.PBegin;

import com.bff.gaia.unified.sdk.values.PCollection;

import com.bff.gaia.unified.sdk.values.PCollectionTuple;

import com.bff.gaia.unified.sdk.values.Row;

import com.bff.gaia.unified.sdk.values.TupleTag;



import javax.annotation.Nullable;



/**

 * This is a quick example, which uses Unified SQL DSL to create a data pipeline.

 *

 * <p>Run the example from the Unified source root with

 *

 * <pre>

 *   ./gradlew :unified-sdks-java-extensions-sql:runBasicExample

 * </pre>

 *

 * <p>The above command executes the example locally using direct runner. Running the pipeline in

 * other runners require additional setup and are out of scope of the SQL examples. Please consult

 * Unified documentation on how to run pipelines.

 */

class UnifiedSqlExample {

  public static void main(String[] args) {

    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);

    Pipeline p = Pipeline.create(options);



    // define the input row format

    Schema type =

        Schema.builder().addInt32Field("c1").addStringField("c2").addDoubleField("c3").build();



    Row row1 = Row.withSchema(type).addValues(1, "row", 1.0).build();

    Row row2 = Row.withSchema(type).addValues(2, "row", 2.0).build();

    Row row3 = Row.withSchema(type).addValues(3, "row", 3.0).build();



    // create a source PCollection with Create.of();

    PCollection<Row> inputTable =

        PBegin.in(p)

            .apply(

                Create.of(row1, row2, row3)

                    .withSchema(

                        type, SerializableFunctions.identity(), SerializableFunctions.identity()));



    // Case 1. run a simple SQL query over input PCollection with UnifiedSql.simpleQuery;

    PCollection<Row> outputStream =

        inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1"));



    // print the output record of case 1;

    outputStream.apply(

        "log_result",

        MapElements.via(

            new SimpleFunction<Row, Void>() {

              @Override

              public @Nullable

			  Void apply(Row input) {

                // expect output:

                //  PCOLLECTION: [3, row, 3.0]

                //  PCOLLECTION: [2, row, 2.0]

                System.out.println("PCOLLECTION: " + input.getValues());

                return null;

              }

            }));



    // Case 2. run the query with SqlTransform.query over result PCollection of case 1.

    PCollection<Row> outputStream2 =

        PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream)

            .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2"));



    // print the output record of case 2;

    outputStream2.apply(

        "log_result",

        MapElements.via(

            new SimpleFunction<Row, Void>() {

              @Override

              public @Nullable

			  Void apply(Row input) {

                // expect output:

                //  CASE1_RESULT: [row, 5.0]

                System.out.println("CASE1_RESULT: " + input.getValues());

                return null;

              }

            }));



    p.run().waitUntilFinish();

  }

}